package com.atguigu.app.dwd.db;

import com.atguigu.utils.MyKafkaUtil;
import com.atguigu.utils.MysqlUtil;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

//数据流：web/app -> Nginx -> 业务服务器 -> Mysql(Binlog) -> Maxwell -> Kafka(ODS) -> FlinkApp -> Kafka(DWD)
//程  序：  Mock -> Mysql(Binlog) -> Maxwell -> Kafka(ZK) -> DwdTradeCartAdd -> Kafka(ZK)
public class DwdTradeCartAdd {

    public static void main(String[] args) throws Exception {

        //TODO 1.获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);  //生产环境设置为Kafka的分区数
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        //1.1 设置状态后端
        //System.setProperty("HADOOP_USER_NAME", "atguigu");
        //env.setStateBackend(new HashMapStateBackend());
        //env.getCheckpointConfig().setCheckpointStorage("hdfs://hadoop102:8020/gmall-flink/ck");

        //1.2 开启CK
        //env.enableCheckpointing(3 * 60000L);
        //env.getCheckpointConfig().setCheckpointTimeout(5 * 60000L);
        //env.getCheckpointConfig().setMaxConcurrentCheckpoints(2);
        //env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

        //TODO 2.读取Kafka ODS层 topic_db 主题的数据创建动态表
        tableEnv.executeSql(MyKafkaUtil.getTopicDbDDL("cart_add_211227"));

        //TODO 3.过滤出加购数据
        Table cartInfoTable = tableEnv.sqlQuery("" +
                "select " +
                "    `data`['id'] id, " +
                "    `data`['user_id'] user_id, " +
                "    `data`['sku_id'] sku_id, " +
                //"    `data`['cart_price'], " +
                "    if(`type`='insert',`data`['sku_num'],cast(cast(`data`['sku_num'] as int)-cast(`old`['sku_num'] as int) as string)) sku_num, " +
                //"    `data`['img_url'], " +
                "    `data`['sku_name'] sku_name, " +
                "    `data`['is_checked'] is_checked, " +
                "    `data`['create_time'] create_time, " +
                "    `data`['operate_time'] operate_time, " +
                "    `data`['is_ordered'] is_ordered, " +
                "    `data`['order_time'] order_time, " +
                "    `data`['source_type'] source_type, " +
                "    `data`['source_id'] source_id, " +
                "    pt " +
                "from topic_db " +
                "where `database`='gmall-211227-flink' " +
                "and `table` = 'cart_info' " +
                "and ( " +
                "    `type` = 'insert'  " +
                "    or  " +
                "    (`type` = 'update' and `old`['sku_num'] is not null and cast(`data`['sku_num'] as int) > cast(`old`['sku_num'] as int)) " +
                ")");
        tableEnv.createTemporaryView("cart_info", cartInfoTable);

        //转换为流并打印
        //tableEnv.toAppendStream(cartInfoTable, Row.class).print(">>>>>>>>>>");

        //TODO 4.构建MySQL base_dic LookUp表
        tableEnv.executeSql(MysqlUtil.getBaseDicLookUpDDL());

        //TODO 5.关联加购与码表,做维度退化
        Table resultTable = tableEnv.sqlQuery("" +
                "select " +
                "    ci.id, " +
                "    ci.user_id, " +
                "    ci.sku_id, " +
                "    ci.sku_num, " +
                "    ci.sku_name, " +
                "    ci.is_checked, " +
                "    ci.create_time, " +
                "    ci.operate_time, " +
                "    ci.is_ordered, " +
                "    ci.order_time, " +
                "    ci.source_type, " +
                "    dic.dic_name, " +
                "    ci.source_id " +
                "from cart_info ci " +
                "join base_dic FOR SYSTEM_TIME AS OF ci.pt dic " +
                "on ci.source_type = dic.dic_code");
        tableEnv.createTemporaryView("result_table", resultTable);

        //TODO 6.构建DWD层加购事实表
        tableEnv.executeSql("" +
                "create table dwd_cart_info( " +
                "    `id` String, " +
                "    `user_id` String, " +
                "    `sku_id` String, " +
                "    `sku_num` String, " +
                "    `sku_name` String, " +
                "    `is_checked` String, " +
                "    `create_time` String, " +
                "    `operate_time` String, " +
                "    `is_ordered` String, " +
                "    `order_time` String, " +
                "    `source_type` String, " +
                "    `dic_name` String, " +
                "    `source_id` String " +
                ")" + MyKafkaUtil.getKafkaSinkDDL("dwd_trade_cart_add"));

        //TODO 7.将退化之后的表写出到Kafka
        tableEnv.executeSql("insert into dwd_cart_info select * from result_table");

        //env.execute("DwdTradeCartAdd");

    }

}
